package com.atguigu.flink.sql.function;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;

/**
 * Created by Smexy on 2023/11/20

        UDTF函数，表生成函数。
            类似hive中的explode()

 将字符串按照_拆分后输出拆分后的每一个子串及拆分前字符串的长度。如果id中含有s3不处理。
 输入:                     输出
 s1_1,2000,10              s1,4
                           1,4
 */
public class Demo4_UDTFunctionCustom
{
    public static void main(String[] args) {

        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
        TableEnvironment tableEnv = TableEnvironment.create(settings);

        String createTableSql = "CREATE TABLE t1 (" +
            "  id STRING," +
            "  ts BIGINT," +
            "  vc INT " +
            ")  WITH (" +
            "  'connector' = 'filesystem',   " +
            "  'path' = 'data/ws3.json', " +
            "  'format' = 'json'             " +
            ")";

        tableEnv.executeSql(createTableSql);

        //创建函数对象
        MyUDTF myFunction = new MyUDTF();
        //为函数对象起个名字
        tableEnv.createTemporaryFunction("myUDTF",myFunction);
        /*
            hive中如果要查询除了UDTF炸裂的字段外的其他字段，必须使用lateral view，它会被翻译为 join
            flink不支持这个语法，就直接使用join
         */
        tableEnv.sqlQuery("select id, a, b from t1" +
                    //" join LATERAL TABLE(myUDTF(id)) ON TRUE  "
                    " left join LATERAL TABLE(myUDTF(id)) ON TRUE  "
                )
                .execute()
                .print();

    }

    /*
        TableFunction<T>: T是输出一行的类型，可以使用POJO封装一行的多个列，或者使用Row(省事)
        Row： 代表一行，其中可以按照顺序，存放N个列
       --------------------------------
        Hive中explode必须和lateral view一起使用
            select
                列名1，列名2,xxx
            from xx
            lateral view explode(x) 临时表名 as 列名1，列名2

            lateral view语法会被翻译为 join


     */
    //为表生成函数输出到每一行的每个字段起名字，提供类型的说明
    @FunctionHint(output = @DataTypeHint("Row<a STRING,b INT>"))
    public static class MyUDTF extends TableFunction<Row>
    {
        //定义一个名为eval的方法，不能有返回值。
        public void eval(String id){
            if (!id.contains("s3")){
                int length = id.length();
                String[] words = id.split("_");
                for (String word : words) {
                    //输出一行
                    collect(Row.of(word,length));
                }
            }
        }
    }


}
